/**
 * Copyright (C) @2014 Webank Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.webank.framework.biz.server;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadFactory;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import cn.webank.framework.biz.dto.ServerInfo;
import cn.webank.framework.exception.SysException;
import cn.webank.framework.message.integration.RMBSao;
import cn.webank.framework.message.subscriber.PullMessageSubscriber;

/**
 * 后台服务Server基类
 * 
 * @author jonyang
 *
 */
abstract public class WeBankBaseServer {
	private final static Logger LOG = LoggerFactory.getLogger(WeBankBaseServer.class);

	/**
	 * 启动命令
	 */
	public final static String COMMAND_ACTION_START = "start";
	/**
	 * 停止命令
	 */
	public final static String COMMAND_ACTION_STOP = "stop";

	/**
	 * Spring框架系统上下文
	 */
	protected ClassPathXmlApplicationContext context;

	/**
	 * 线程工厂类，负责创建线程
	 */
	private ThreadFactory threadFactory;
	/**
	 * 工作线程池列表
	 */
	private List<ThreadPoolTaskExecutor> taskThreadPools;
	/**
	 * server是否停止
	 */
	private volatile boolean isShutDown = false;

	/**
	 * RMB订阅者列表
	 */
	private List<PullMessageSubscriber> subscribers = new ArrayList<PullMessageSubscriber>();

	/**
	 * server地址
	 */
	private String address;

	/**
	 * server shutdown端口
	 */
	private int port = 8005;

	/**
	 * server shutdown命令
	 */
	private String shutdown = "SHUTDOWN";

	/**
	 * 随机数
	 */
	private Random random = null;

	protected int connectTimeoutMillSeconds = 5000;

	protected int soTimeoutMillSeconds = 5000;

	public int getSoTimeoutMillSeconds() {
		return soTimeoutMillSeconds;
	}

	public void setSoTimeoutMillSeconds(int soTimeoutMillSeconds) {
		this.soTimeoutMillSeconds = soTimeoutMillSeconds;
	}

	/**
	 * Thread that currently is inside our await() method.
	 * 
	 * @return ClassPathXmlApplicationContext
	 */
	public ClassPathXmlApplicationContext getContext() {
		return context;
	}

	public int getConnectTimeoutMillSeconds() {
		return connectTimeoutMillSeconds;
	}

	public void setContext(ClassPathXmlApplicationContext context) {
		this.context = context;
	}

	public void setConnectTimeoutMillSeconds(int connectTimeoutMillSeconds) {
		this.connectTimeoutMillSeconds = connectTimeoutMillSeconds;
	}

	/**
	 * RMB服务
	 */
	private RMBSao rmbSao;

	public WeBankBaseServer(String address, int port, String shutdownCommand) {
		this(address, port, shutdownCommand, null);
	}

	public WeBankBaseServer(String address, int port, String shutdownCommand, RMBSao rmbSao,
			List<ThreadPoolTaskExecutor> taskExecutors, ThreadFactory threadFactory,
			ClassPathXmlApplicationContext context) {
		this.address = address;
		this.port = port;
		this.shutdown = shutdownCommand;
		this.rmbSao = rmbSao;
		this.taskThreadPools = taskExecutors;
		this.threadFactory = threadFactory;
		this.context = context;
	}

	public WeBankBaseServer(String address, int port, String shutdownCommand, ClassPathXmlApplicationContext context) {
		this.address = address;
		this.port = port;
		this.shutdown = shutdownCommand;
		this.context = context;

		if (context != null) {
			RMBSao aRMBSao = context.getBean(RMBSao.class);
			this.rmbSao = aRMBSao;

			Map<String, ThreadPoolTaskExecutor> beansOfType = context.getBeansOfType(ThreadPoolTaskExecutor.class);
			this.taskThreadPools = new ArrayList<ThreadPoolTaskExecutor>(beansOfType.values());

			ThreadFactory aThreadFactory = context.getBean(ThreadFactory.class);
			if (aThreadFactory != null) {
				this.threadFactory = aThreadFactory;
			}
		}

	}

	/**
	 * @return the rmbSao
	 */
	public RMBSao getRmbSao() {
		return rmbSao;
	}

	/**
	 * @param rmbSao
	 *            the rmbSao to set
	 */
	public void setRmbSao(RMBSao rmbSao) {
		this.rmbSao = rmbSao;
	}

	public void registerSubscriber(PullMessageSubscriber subscriber) {
		if (rmbSao != null && subscriber != null) {
			this.subscribers.add(subscriber);
			rmbSao.registerSubscriber(subscriber);
		}
	}

	public void unregisterSubscriber() {
		if (rmbSao != null) {
			for (PullMessageSubscriber s : this.subscribers) {
				rmbSao.deregisterSubscriber(s);
			}
		}
	}

	private void startServer() throws Exception {
		// register shutdownhook
		try {
			doStart();
			Runtime.getRuntime().addShutdownHook(new Thread() {
				public void run() {
					try {
						stopServer();
					} catch (Exception e) {
						LOG.error("shutdown error", e);
					}
				}
			});

			// start server,join wait thread complete
			isShutDown = false;
			LOG.info("Server start now !!!");

			this.await();
		} catch (Throwable e) {
			this.stopServer();
			throw e;
		}

	}

	private void stopServer() throws Exception {
		if (!isShutDown) {
			doStop();
			isShutDown = true;

			if (rmbSao != null) {
				if (subscribers != null) {
					for (PullMessageSubscriber m : subscribers) {
						m.setOpen(false);
						rmbSao.deregisterSubscriber(m);
					}
				}

				rmbSao.destroy();
			}

			if (taskThreadPools != null) {

				for (ThreadPoolTaskExecutor t : taskThreadPools) {
					t.shutdown();
				}

			}

		}

		if (this.context != null) {
			this.context.close();
		}

	}

	/**
	 * @return the threadFactory
	 */
	public ThreadFactory getThreadFactory() {
		return threadFactory;
	}

	/**
	 * @param threadFactory
	 *            the threadFactory to set
	 */
	public void setThreadFactory(ThreadFactory threadFactory) {
		this.threadFactory = threadFactory;
	}

	/**
	 * @return the taskThreadPool
	 */
	public List<ThreadPoolTaskExecutor> getTaskThreadPools() {
		return taskThreadPools;
	}

	/**
	 * @param taskThreadPools
	 *            the taskThreadPool list to set ,release resource
	 */
	public void setTaskThreadPools(List<ThreadPoolTaskExecutor> taskThreadPools) {
		this.taskThreadPools = taskThreadPools;
	}

	public void shutdownServer() throws Exception {
		Socket socket = null;
		OutputStream stream = null;
		try {

			// socket = new Socket(address, port);
			socket = new Socket();
			socket.connect(new InetSocketAddress(address, port), connectTimeoutMillSeconds);
			socket.setSoTimeout(10000);
			stream = socket.getOutputStream();

			for (int i = 0; i < shutdown.length(); i++) {
				stream.write(shutdown.charAt(i));
			}
			stream.flush();
		} catch (Exception e) {
			e.printStackTrace();
			LOG.error("shutdown server error", e);
		} finally {
			if (stream != null) {
				try {
					stream.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}

			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}

	/**
	 * Wait until a proper shutdown command is received, then return. This keeps
	 * the main thread alive - the thread pool listening for http connections is
	 * daemon threads.
	 */
	public void await() {
		// Set up a server socket to wait on
		ServerSocket awaitSocket = null;

		try {
			awaitSocket = new ServerSocket(port, 1, InetAddress.getByName(address));
		} catch (IOException e) {
			LOG.error("StandardServer.await: create[" + address + ":" + port + "]: ", e);
			throw new SysException("create server error", e);
		}

		try {
			// Loop waiting for a connection and a valid command
			while (!isShutDown) {
				ServerSocket serverSocket = awaitSocket;
				if (serverSocket == null) {
					break;
				}

				// Wait for the next connection
				Socket socket = null;
				StringBuilder command = new StringBuilder();
				try {
					InputStream stream;
					try {
						socket = serverSocket.accept();
						socket.setSoTimeout(10 * 1000); // Ten seconds
						stream = socket.getInputStream();
					} catch (AccessControlException ace) {
						LOG.warn("StandardServer.accept security exception: " + ace.getMessage(), ace);
						continue;
					} catch (IOException e) {
						if (isShutDown) {
							// Wait was aborted with socket.close()
							break;
						}
						LOG.error("StandardServer.await: accept: ", e);
						break;
					}

					// Read a set of characters from the socket
					int expected = 1024; // Cut off to avoid DoS attack

					while (expected < shutdown.length()) {
						if (random == null)
							random = new Random();
						expected += (random.nextInt() % 1024);
					}

					while (expected > 0) {
						int ch = -1;

						try {
							ch = stream.read();
						} catch (IOException e) {
							LOG.warn("StandardServer.await: read: ", e);
							ch = -1;
						}

						if (ch < 32) // Control character or EOF terminates
										// loop
							break;

						command.append((char) ch);
						expected--;
					}
				} finally {
					// Close the socket now that we are done with it
					try {
						if (socket != null) {
							socket.close();

						}
					} catch (IOException e) {
						// Ignore
					}
				}

				// Match against our command string
				boolean match = command.toString().equals(shutdown);
				if (match) {
					stopServer();
					LOG.info("shutdown server now!!!");
					break;
				} else
					LOG.warn("StandardServer.await: Invalid command '" + command.toString() + "' received");
			}
		} catch (Exception e) {
			throw new SysException("other server error", e);
		} finally {
			ServerSocket serverSocket = awaitSocket;
			// awaitThread = null;
			awaitSocket = null;

			// Close the server socket and return
			if (serverSocket != null) {
				try {
					serverSocket.close();
				} catch (IOException e) {
					// Ignore
				}
			}
		}
	}

	/**
	 * 帮助函数
	 * 
	 * @param serverClassName
	 *            类名
	 */
	public static void help(String serverClassName) {
		System.out.println("java -cp " + serverClassName + "-d host -p port -s shutdownstring [start|stop]");
	}

	protected static ServerInfo parseArgs(String[] args) throws Exception {
		Options options = new Options();

		Option host = Option.builder("d").argName("host").hasArg().desc("server host").build();

		Option port = Option.builder("p").argName("port").hasArg().desc("server port").build();

		Option shutdown = Option.builder("s").argName("shutdown").hasArg().desc("server shutdown command").build();
		options.addOption(host).addOption(port).addOption(shutdown);

		CommandLineParser parser = new DefaultParser();
		// parse the command line arguments
		CommandLine cl = parser.parse(options, args);
		if (!cl.hasOption("d") || !cl.hasOption("p") || !cl.hasOption("s")) {
			help(WeBankBaseServer.class.getName());
		}

		String addressValue = cl.getOptionValue("d");
		int portValue = Integer.parseInt(cl.getOptionValue("p"));
		String shutdownValue = cl.getOptionValue("s");

		String[] str = cl.getArgs();
		int length = str.length;
		String commandValue = str[length - 1].trim();

		ServerInfo serverInfo = new ServerInfo();
		serverInfo.setAddress(addressValue);
		serverInfo.setCommand(ServerCommandEnum.getServerCommandEnum(commandValue.toLowerCase()));
		serverInfo.setPort(portValue);
		serverInfo.setShutdown(shutdownValue);

		return serverInfo;

	}

	/**
	 * 启动中要处理的内容,在startServer时调用
	 * 
	 * @throws Exception
	 *             系统异常
	 */
	abstract public void doStart() throws Exception;

	/**
	 * 停机前要处理的内容，在stopServer时调用
	 * 
	 * @throws Exception
	 *             系统异常
	 */
	abstract public void doStop() throws Exception;

	/**
	 * 提醒的内容，在异常指令下的提示
	 * 
	 * @throws Exception
	 *             系统异常
	 */
	abstract public void doHelp() throws Exception;

	public void service(ServerCommandEnum command) throws Exception {
		if (command.equals(ServerCommandEnum.START)) {
			this.startServer();// 启动server，并等待shutdown指令
		} else if (command.equals(ServerCommandEnum.STOP)) {
			this.shutdownServer();// 给启动的server发shutdown指令
		} else if (command.equals(ServerCommandEnum.HELP)) {
			this.doHelp();
		}
	}

	public static void main(String[] args) throws Exception {
		// 以下是例子，仅供参考
		try {
			ServerInfo serverInfo = WeBankBaseServer.parseArgs(args);
			ServerCommandEnum command = serverInfo.getCommand();
			WeBankBaseServer server = new WeBankBaseServer(serverInfo.getAddress(), serverInfo.getPort(),
					serverInfo.getShutdown(), null) {
				public void doStart() throws Exception {
					this.registerSubscriber(null);
				}

				public void doStop() throws Exception {

				}

				public void doHelp() throws Exception {
					help(WeBankBaseServer.class.getName());
				}
			};

			server.service(command);

			System.exit(0);

		} catch (Exception exp) {
			System.err.println("Server failed.  Reason: " + exp.getMessage());
			System.exit(1);
		}

	}
}
